Skip to content

Add audio JSONL reader and tarred ASR dataset support#1780

Open
ssh-meister wants to merge 27 commits intomainfrom
ameister/reader
Open

Add audio JSONL reader and tarred ASR dataset support#1780
ssh-meister wants to merge 27 commits intomainfrom
ameister/reader

Conversation

@ssh-meister
Copy link
Copy Markdown

@ssh-meister ssh-meister commented Apr 9, 2026

Description

Adds audio-focused JSONL reading, plain audio manifest reading, tarred ASR dataset support, and generic file-transfer stages to NeMo Curator.

This PR extends JsonlReader with task_type="audio" so JSONL manifests can now emit AudioTask objects directly instead of only DocumentBatch. The new audio reader path supports one-manifest-line-per-task fanout and also preserves stable _curator_dedup_id assignment via _generate_ids / _assign_ids.

In addition, this PR adds a plain audio manifest reader for JSONL manifests that already contain real file paths or URIs (for example local paths or s3://... URIs):

  • AudioManifestReader reads JSONL manifests into AudioTasks while preserving the original audio_filepath
  • manifest entries continue to carry stable sample_key values for downstream processing and checkpoint-friendly behavior

This PR also introduces a bridge for NeMo-style tarred audio datasets:

  • TarredAudioManifestReader reads sharded manifests and matches them to tar shards by shard id
  • MaterializeTarredAudioStage extracts only the needed tar member into a local file just before path-based audio stages
  • CleanupTemporaryAudioStage removes temporary files afterwards and restores the original manifest-style audio_filepath

This avoids eager extraction of the whole tar dataset, keeps compatibility with existing path-based ASR stages, and supports both strict and permissive handling of manifest entries missing from tar shards via skip_missing_entries.

TarredAudioManifestReader also follows the transport ideas used in NeMo's nemo_adapters.py, including support for fsspec-based remote access and pipe:-style specifiers, so the same reader can work with local files, S3-backed storage, and AIS-style commands such as pipe:ais get ....

To support remote file workflows beyond tarred datasets, this PR also adds generic file stages under nemo_curator.stages.file_io:

  • MaterializeFilesStage
  • UploadFilesStage
  • DeleteFilesStage
  • UploadManifestStage

These stages are designed for dict-backed tasks and operate on configurable field paths, including nested paths such as artifacts.local.audio_path. If a configured field path resolves to a string path or URI, the stage can materialize, upload, or delete the corresponding file. This makes the functionality reusable outside audio-specific code while still fitting naturally into audio pipelines.

UploadManifestStage operates on FileGroupTask outputs from writer stages, so it can be chained after JsonlWriter, ParquetWriter, ALM manifest writing, or other writers that emit file groups.

Checkpoint / resume building blocks

This PR also adds the first building blocks for audio-stage checkpointing and resume.

A stable sample_key has been introduced for AudioTask as a per-sample identity that is independent of runtime-specific task_id values. This key is now populated by both audio reader paths:

  • JsonlReader(task_type="audio") / JsonlAudioReaderStage
  • TarredAudioManifestReaderStage
  • plain audio manifest reading via AudioManifestReader

If an input manifest entry already contains sample_key, it is preserved as-is. Otherwise, a deterministic key is derived from stable sample identity fields such as audio_filepath, tar shard/member metadata, offset, duration, and dataset_name.

In addition, MaterializeTarredAudioStage supports optional durable materialization via:

  • materialization_dir: str | None = None

Behavior is:

  • if materialization_dir is None, the stage writes extracted audio to temporary local files
  • if materialization_dir is set, the stage writes extracted or segmented audio to a deterministic durable path derived from sample_key

This makes the tarred-audio bridge more checkpoint/resume-friendly without changing the default behavior for existing pipelines. CleanupTemporaryAudioStage continues to clean up true temporary files, while durable materialized files are left intact and audio_filepath is still restored back to the original manifest-style path after cleanup.

These changes do not yet implement full pipeline-level checkpoint orchestration, but they establish two core primitives needed for that next step:

  • a stable per-sample identifier (sample_key)
  • an optional persistent materialization location (materialization_dir)

Pipeline Examples

1. Tarred ASR pipeline

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.audio import (
    TarredAudioManifestReader,
    MaterializeTarredAudioStage,
    CleanupTemporaryAudioStage,
)
from nemo_curator.stages.audio.inference.asr_nemo import InferenceAsrNemoStage
from nemo_curator.stages.audio.metrics.get_wer import GetPairwiseWerStage
from nemo_curator.stages.audio.io import AudioToDocumentStage
from nemo_curator.stages.text.io.writer import JsonlWriter

pipeline = Pipeline(name="tarred_asr_pipeline")

pipeline.add_stage(
    TarredAudioManifestReader(
        manifest_paths="/path/to/manifests/manifest__OP_0..255_CL_.json",
        tar_paths="/path/to/audio_shards/audio__OP_0..255_CL_.tar",
        skip_missing_entries=True,
    )
)
pipeline.add_stage(MaterializeTarredAudioStage())
pipeline.add_stage(InferenceAsrNemoStage(model_name="nvidia/stt_en_fastconformer_hybrid_large_pc"))
pipeline.add_stage(GetPairwiseWerStage(text_key="text", pred_text_key="pred_text", wer_key="wer"))
pipeline.add_stage(CleanupTemporaryAudioStage())
pipeline.add_stage(AudioToDocumentStage())
pipeline.add_stage(JsonlWriter(path="/path/to/output"))

2. Plain remote manifest -> local materialization -> ASR -> upload result manifest

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.audio import AudioManifestReader
from nemo_curator.stages.audio.inference.asr_nemo import InferenceAsrNemoStage
from nemo_curator.stages.audio.metrics.get_wer import GetPairwiseWerStage
from nemo_curator.stages.audio.io import AudioToDocumentStage
from nemo_curator.stages.file_io import MaterializeFilesStage, UploadManifestStage
from nemo_curator.stages.text.io.writer import JsonlWriter
pipeline = Pipeline(name="remote_manifest_asr_pipeline")
pipeline.add_stage(
    AudioManifestReader(
        manifest_paths="s3://my-bucket/input_manifests/",
        files_per_partition=8,
    )
)
# Materialize remote audio objects locally so downstream path-based audio stages
# can consume them. This example overwrites audio_filepath in-place.
pipeline.add_stage(
    MaterializeFilesStage(
        source_field_path="audio_filepath",
        output_field_path="audio_filepath",
        temp_dir="/tmp/nemo_curator_audio",
    )
)
pipeline.add_stage(InferenceAsrNemoStage(model_name="nvidia/stt_en_fastconformer_hybrid_large_pc"))
pipeline.add_stage(GetPairwiseWerStage(text_key="text", pred_text_key="pred_text", wer_key="wer"))
pipeline.add_stage(AudioToDocumentStage())
pipeline.add_stage(JsonlWriter(path="/tmp/asr_results"))
pipeline.add_stage(
    UploadManifestStage(
        protocol="s3",
        bucket="my-output-bucket",
        key_prefix="asr/results",
    )
)

3. Upload files from a nested field and then delete local copies

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.audio import AudioManifestReader
from nemo_curator.stages.file_io import UploadFilesStage, DeleteFilesStage
pipeline = Pipeline(name="upload_and_cleanup_pipeline")
pipeline.add_stage(
    AudioManifestReader(
        manifest_paths="/path/to/local_manifest.jsonl",
    )
)
# Example manifest entry shape:
# {
#   "audio_filepath": "/data/source.wav",
#   "artifacts": {
#       "local": {
#           "segment_path": "/tmp/segments/sample_001.wav"
#       }
#   }
# }
pipeline.add_stage(
    UploadFilesStage(
        source_field_path="artifacts.local.segment_path",
        output_field_path="artifacts.remote.segment_uri",
        protocol="s3",
        bucket="my-artifact-bucket",
        key_prefix="segments",
    )
)
pipeline.add_stage(
    DeleteFilesStage(
        source_field_path="artifacts.local.segment_path",
    )
)

4. Upload files using a nested key field for destination naming

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.audio import AudioManifestReader
from nemo_curator.stages.file_io import UploadFilesStage
pipeline = Pipeline(name="upload_with_key_field_pipeline")
pipeline.add_stage(AudioManifestReader(manifest_paths="/path/to/manifest.jsonl"))
# Example manifest entry shape:
# {
#   "audio_filepath": "/tmp/processed.wav",
#   "storage": {
#       "target_key": "processed/run_001/sample_42.wav"
#   }
# }
pipeline.add_stage(
    UploadFilesStage(
        source_field_path="audio_filepath",
        output_field_path="uploaded_audio_uri",
        protocol="s3",
        bucket="my-output-bucket",
        key_field_path="storage.target_key",
    )
)

5. Durable tarred materialization for checkpoint-friendly runs

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.audio import (
    TarredAudioManifestReader,
    MaterializeTarredAudioStage,
    CleanupTemporaryAudioStage,
)

pipeline = Pipeline(name="durable_tarred_materialization")

pipeline.add_stage(
    TarredAudioManifestReader(
        manifest_paths="/path/to/manifests/manifest__OP_0..255_CL_.json",
        tar_paths="/path/to/audio_shards/audio__OP_0..255_CL_.tar",
    )
)

pipeline.add_stage(
    MaterializeTarredAudioStage(
        materialization_dir="/raid/materialized_audio_cache",
    )
)

# ... downstream audio stages ...

pipeline.add_stage(CleanupTemporaryAudioStage())

Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
@ssh-meister ssh-meister requested a review from a team as a code owner April 9, 2026 14:46
@ssh-meister ssh-meister requested review from huvunvidia and removed request for a team April 9, 2026 14:46
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented Apr 9, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@ssh-meister ssh-meister enabled auto-merge (squash) April 9, 2026 14:50
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 9, 2026

Greptile Summary

This PR introduces audio-focused JSONL reading (JsonlReader(task_type="audio")), a plain AudioManifestReader, a tarred ASR dataset bridge (TarredAudioManifestReader / MaterializeTarredAudioStage / CleanupTemporaryAudioStage), and generic file-IO stages (MaterializeFilesStage, UploadFilesStage, DeleteFilesStage, UploadManifestStage). It also adds a stable sample_key to AudioTask as a per-sample identity for checkpoint/resume.

All previously flagged P0/P1 issues have been addressed in this revision: _AttrDict.__setattr__/__delattr__ dead code, the sample_key serialization gap, the _PipeStream SIGPIPE crash, the strict=True zip mismatch, the missing limit passthrough, the _should_segment duration-only oversight, and the incorrect test import for CleanupTemporaryAudioStage.

Confidence Score: 5/5

Safe to merge; all previously flagged P0/P1 issues have been resolved and remaining findings are P2 suggestions.

All eight previously flagged issues across multiple review rounds have been addressed — including the _AttrDict dead-code bug, sample_key serialization loss, SIGPIPE crash, strict=True zip, missing limit passthrough, and incorrect test import. Only P2 style/design suggestions remain (sub-sample duration edge case, temp-file tracking gap, and field name shadowing).

nemo_curator/stages/audio/io/materialize.py (sub-sample duration rounding), nemo_curator/stages/file_io/files.py (temp-file cleanup tracking)

Important Files Changed

Filename Overview
nemo_curator/tasks/audio_task.py Adds _AttrDict, build_audio_sample_key, and AudioTask with stable sample_key; all previously flagged issues (dead __setattr__/__delattr__, sample_key not written back) are fixed.
nemo_curator/stages/audio/io/tarred.py Tarred manifest reader with shard matching, MaterializeTarredAudioStage, limit now surfaced; SIGPIPE fix and strict-zip issue resolved in prior review cycle.
nemo_curator/stages/audio/io/materialize.py BaseAudioMaterializeStage and CleanupTemporaryAudioStage; _should_segment now correctly handles duration-only entries. Minor: tiny durations that round to 0 samples produce a silent empty file.
nemo_curator/stages/audio/io/manifest.py Clean AudioManifestReader and AudioManifestReaderStage; audio_filepath is always preserved even when field filtering is active.
nemo_curator/stages/text/io/reader/jsonl.py Adds JsonlAudioReaderStage and extends JsonlReader with task_type="audio"; strict=False zip and correct sample_key propagation applied.
nemo_curator/stages/file_io/files.py Generic MaterializeFilesStage, UploadFilesStage, DeleteFilesStage, UploadManifestStage; temp files created by MaterializeFilesStage are not tracked in task metadata, so there is no built-in cleanup path.
nemo_curator/utils/remote_io.py New PipeStream, fsspec helpers, expand_sharded_paths, copy_path, and remove_path; SIGPIPE return code tolerated correctly via _is_allowed_sigpipe_return_code.

Sequence Diagram

sequenceDiagram
    participant Pipeline
    participant TarredAudioManifestReader
    participant TarredAudioManifestPartitionStage
    participant TarredAudioManifestReaderStage
    participant MaterializeTarredAudioStage
    participant CleanupTemporaryAudioStage

    Pipeline->>TarredAudioManifestReader: decompose()
    TarredAudioManifestReader-->>TarredAudioManifestPartitionStage: expand sharded manifest paths
    TarredAudioManifestPartitionStage-->>TarredAudioManifestReaderStage: FileGroupTask (manifest shard list)
    TarredAudioManifestReaderStage->>TarredAudioManifestReaderStage: match shard_id to tar_path
    TarredAudioManifestReaderStage->>TarredAudioManifestReaderStage: build_audio_sample_key per entry
    TarredAudioManifestReaderStage-->>Pipeline: list[AudioTask] (with _tar_path, _tar_member, sample_key)

    Pipeline->>MaterializeTarredAudioStage: process_batch(tasks)
    MaterializeTarredAudioStage->>MaterializeTarredAudioStage: group by (tar_path, tar_member)
    MaterializeTarredAudioStage->>MaterializeTarredAudioStage: open_binary_stream -> tarfile streaming
    MaterializeTarredAudioStage->>MaterializeTarredAudioStage: _should_segment? -> write bytes or soundfile segment
    MaterializeTarredAudioStage-->>Pipeline: tasks with audio_filepath -> local temp (or durable) path

    Pipeline->>CleanupTemporaryAudioStage: process(task)
    CleanupTemporaryAudioStage->>CleanupTemporaryAudioStage: unlink _temporary_audio_path
    CleanupTemporaryAudioStage->>CleanupTemporaryAudioStage: restore audio_filepath from _manifest_audio_filepath
    CleanupTemporaryAudioStage-->>Pipeline: AudioTask (manifest path restored)
Loading

Reviews (17): Last reviewed commit: "sample_key declaration" | Re-trigger Greptile

Comment thread nemo_curator/stages/audio/io/tarred.py Outdated
Comment thread nemo_curator/stages/text/io/reader/jsonl.py Outdated
Comment thread nemo_curator/stages/audio/io/tarred.py
Comment thread nemo_curator/stages/audio/io/tarred.py Outdated
Signed-off-by: Sasha Meister <ameister@nvidia.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 9, 2026

Tip:

Greploop — Automatically fix all review issues by running /greploops in Claude Code. It iterates: fix, push, re-review, repeat until 5/5 confidence.

Use the Greptile plugin for Claude Code to query reviews, search comments, and manage custom context directly from your terminal.

@Jorjeous
Copy link
Copy Markdown
Member

Jorjeous commented Apr 9, 2026

@ssh-meister plz fix linter and cherry pick this to dev branch with #1780 hashtag

Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Comment thread nemo_curator/stages/audio/io/tarred.py Outdated
Signed-off-by: Sasha Meister <ameister@nvidia.com>
…plain manifest support, and keep tarred audio materialization on a shared core.

Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Comment thread nemo_curator/stages/text/io/reader/jsonl.py Outdated
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Comment thread tests/stages/audio/io/test_tarred.py
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Comment thread nemo_curator/tasks/audio_task.py
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Comment thread tests/stages/text/io/reader/test_jsonl.py Outdated
Signed-off-by: Sasha Meister <ameister@nvidia.com>
@ssh-meister ssh-meister self-assigned this Apr 30, 2026
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
@ssh-meister ssh-meister changed the base branch from main to ameister/checkpointing April 30, 2026 10:24
@ssh-meister ssh-meister changed the base branch from ameister/checkpointing to main April 30, 2026 10:25
Copy link
Copy Markdown
Contributor

@sarahyurick sarahyurick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!



@dataclass
class AudioManifestReader(CompositeStage[_EmptyTask, AudioTask]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the existing manifest reader?


@dataclass
class JsonlReader(CompositeStage[_EmptyTask, DocumentBatch]):
class JsonlAudioReaderStage(ProcessingStage[FileGroupTask, AudioTask]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems confusing to have this under stages/text. It should be under audio right?


def outputs(self) -> tuple[list[str], list[str]]:
output_fields = list(self.fields or [])
if self._generate_ids or self._assign_ids:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case for generate/assign IDs here?

Comment on lines +100 to +101
_generate_ids: bool = False
_assign_ids: bool = False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these used for some audio pipelines, or are they just there to match the text version?

Comment on lines +121 to +131
if self._generate_ids or self._assign_ids:
from nemo_curator.stages.deduplication.id_generator import get_id_generator_actor

try:
self.id_generator = get_id_generator_actor()
except ValueError:
msg = (
"ID generator is required when self._generate_ids or self._assign_ids is True, "
"and the actor 'id_generator' does not exist. Please start the id_generator actor."
)
raise RuntimeError(msg) from None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

assert result[0].data.iloc[0]["text"] == "hi"


def test_process_batch_serializes_constructor_sample_key() -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this test IMO.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a cleanup to remove any files created by these tests?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No copyright needed on an empty file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a bit confusing that audio tests are in the text directory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants